Skip to content

Commit 285b224

Browse files
authored
eval query support and a few fixes (#258)
## Summary eval("select ... from ... where ...") is now supported. easier for exploration. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Documentation** - Updated setup instructions for Chronon Python interactive runner - Modified environment variable paths and gateway service instructions - **New Features** - Added ability to specify result limit when evaluating queries - Enhanced SQL query evaluation with error handling - **Bug Fixes** - Simplified interactive usage example - Updated Spark session method in DataFrame creation <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 4367262 commit 285b224

File tree

3 files changed

+38
-10
lines changed

3 files changed

+38
-10
lines changed

api/py/ai/chronon/repo/interactive.md

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,37 @@ Once we bulk it out - we can use this to power
2222
2. in your `~/.bashrc` or `~/.zshrc` add `SPARK_HOME` & `CHRONON_ROOT` env vars
2323
```
2424
export SPARK_HOME=/Users/nikhilsimha/spark-3.5.4-bin-hadoop3 # choose dir where you unpacked spark
25-
export CHRONON_ROOT=/Users/nikhilsimha/repos/chronon/api/py/test/sample # choose your conf dir
25+
export CHRONON_ROOT=/Users/nikhilsimha/repos/etsy/zipline # choose your conf dir
2626
```
2727

28-
### Running gateway
28+
3. Copy the chronon jar from your blob store and set it to `CHRONON_SPARK_JAR`.
29+
```
30+
# for EXAMPLE, in your ~/.zshrc
31+
export CHRONON_SPARK_JAR=/Users/nikhilsimha/Downloads/chronon_spark_assembly.jar
32+
```
2933

30-
1. Load parquet files into `$CHRONON_ROOT/local_warehouse/<namespace>/<table>`
3134

35+
### Running gateway
36+
37+
1. Load parquet files into `$CHRONON_ROOT/local_warehouse/<namespace>/<table>`
3238

3339
2. Start gateway service - from chronon root
3440
```
35-
> sbt spark/assembly && ./scripts/interactive/gateway.sh
41+
> ./scripts/interactive/gateway.sh
3642
```
3743

3844
### Using interactively
3945

4046
1. You can interact with your conf objects in vscode in a notebook like so..
4147

4248
```py
49+
50+
# import source, I do this by running cell from the py file actually with "#%%" in vscode.
4351
from group_bys.etsy_search.visit_beacon import source
52+
# import runner
4453
from ai.chronon.repo.interactive import eval
4554

46-
df = eval(source)
47-
df.show()
55+
eval(source)
4856
```
4957

5058
```

api/py/ai/chronon/repo/interactive.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import ai.chronon.api.ttypes as thrift
55

66

7-
def eval(obj):
7+
def eval(obj, limit=5):
88
"""
99
utility function to run conf's in an interactive environment.
1010
@@ -22,7 +22,7 @@ def eval(obj):
2222
evaluator = gateway.entry_point
2323

2424
if isinstance(obj, str):
25-
evaluator.evalQuery(obj) # TODO
25+
return _to_df(evaluator.evalQuery(obj, limit))
2626

2727
func = None
2828

@@ -41,9 +41,12 @@ def eval(obj):
4141
elif isinstance(obj, thrift.Model):
4242
func = evaluator.evalModel # TODO
4343

44+
else:
45+
raise Exception(f"Unsupported object type for: {obj}")
46+
4447
thrift_str = ser.thrift_simple_json(obj)
4548

46-
eval_result = func(thrift_str, 5)
49+
eval_result = func(thrift_str, limit)
4750

4851
return _to_df(eval_result)
4952

@@ -57,4 +60,7 @@ def _to_df(eval_result):
5760
raise Exception(error)
5861

5962
df = eval_result.df()
60-
return DataFrame(df, df.sqlContext())
63+
64+
py_df = DataFrame(df, df.sparkSession())
65+
66+
return py_df

spark/src/main/scala/ai/chronon/spark/interactive/Evaluator.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,20 @@ class Evaluator(localWarehouse: LocalWarehouse) {
3333
}
3434
}
3535

36+
def evalQuery(query: String, limit: Int = 5): EvaluationResult = {
37+
38+
try {
39+
40+
EvaluationResult(null, localWarehouse.runSql(query).limit(limit))
41+
42+
} catch {
43+
44+
case analysisException: AnalysisException =>
45+
EvaluationResult(analysisException.getMessage, null)
46+
47+
}
48+
}
49+
3650
private def runSourceBundle(sourceSqlBundle: SourceSqlBundle): DataFrame = {
3751

3852
val missingTables = sourceSqlBundle.tables -- localWarehouse.existingTables

0 commit comments

Comments
 (0)