1
+ #
2
+ # The script below is a standalone script that can be used to run a SQL query locally.
3
+ #
4
+ # Argements:
5
+ # - sql: stdin input a SQL query
6
+ #
7
+ # Environment variables:
8
+ # - WREN_MANIFEST_JSON_PATH: path to the manifest JSON file
9
+ # - REMOTE_FUNCTION_LIST_PATH: path to the function list file
10
+ # - CONNECTION_INFO_PATH: path to the connection info file
11
+ # - DATA_SOURCE: data source name
12
+ #
13
+
14
+ import base64
15
+ import json
16
+ import os
17
+ import sqlglot
18
+ import sys
19
+
20
+ from dotenv import load_dotenv
21
+ from wren_core import SessionContext
22
+ from app .model .data_source import BigQueryConnectionInfo
23
+ from app .model .data_source import DataSourceExtension
24
+
25
+ if sys .stdin .isatty ():
26
+ print ("please provide the SQL query via stdin, e.g. `python query_local_run.py < test.sql`" , file = sys .stderr )
27
+ sys .exit (1 )
28
+
29
+ sql = sys .stdin .read ()
30
+
31
+
32
+ load_dotenv ()
33
+ manifest_json_path = os .getenv ("WREN_MANIFEST_JSON_PATH" )
34
+ function_list_path = os .getenv ("REMOTE_FUNCTION_LIST_PATH" )
35
+ connection_info_path = os .getenv ("CONNECTION_INFO_PATH" )
36
+ data_source = os .getenv ("DATA_SOURCE" )
37
+
38
+ # Welcome message
39
+ print ("### Welcome to the Wren Core Query Runner ###" )
40
+ print ("#" )
41
+ print ("# Manifest JSON Path:" , manifest_json_path )
42
+ print ("# Function List Path:" , function_list_path )
43
+ print ("# Connection Info Path:" , connection_info_path )
44
+ print ("# Data Source:" , data_source )
45
+ print ("# SQL Query:" , sql )
46
+ print ("#" )
47
+
48
+ # Read and encode the JSON data
49
+ with open (manifest_json_path ) as file :
50
+ mdl = json .load (file )
51
+ # Convert to JSON string
52
+ json_str = json .dumps (mdl )
53
+ # Encode to base64
54
+ encoded_str = base64 .b64encode (json_str .encode ("utf-8" )).decode ("utf-8" )
55
+
56
+ with open (connection_info_path ) as file :
57
+ connection_info = json .load (file )
58
+
59
+ print ("### Starting the session context ###" )
60
+ print ("#" )
61
+ session_context = SessionContext (encoded_str , function_list_path )
62
+ planned_sql = session_context .transform_sql (sql )
63
+ print ("# Planned SQL:" , planned_sql )
64
+
65
+ # Transpile the planned SQL
66
+ dialect_sql = sqlglot .transpile (planned_sql , read = "trino" , write = data_source )[0 ]
67
+ print ("# Dialect SQL:" , dialect_sql )
68
+ print ("#" )
69
+
70
+ if data_source == "bigquery" :
71
+ connection_info = BigQueryConnectionInfo .model_validate_json (json .dumps (connection_info ))
72
+ connection = DataSourceExtension .get_bigquery_connection (connection_info )
73
+ df = connection .sql (dialect_sql ).limit (10 ).to_pandas ()
74
+ print ("### Result ###" )
75
+ print ("" )
76
+ print (df )
77
+ else :
78
+ print ("Unsupported data source:" , data_source )
0 commit comments