8
8
from app .config import get_config
9
9
from app .dependencies import verify_query_dto
10
10
from app .mdl .core import get_session_context
11
+ from app .mdl .java_engine import JavaEngineConnector
11
12
from app .mdl .rewriter import Rewriter
12
13
from app .mdl .substitute import ModelSubstitute
13
14
from app .model import (
19
20
from app .model .connector import Connector
20
21
from app .model .data_source import DataSource
21
22
from app .model .validator import Validator
23
+ from app .routers import v2
24
+ from app .routers .v2 .connector import get_java_engine_connector
22
25
from app .util import build_context , pushdown_limit , to_json
23
26
24
27
router = APIRouter (prefix = "/connector" )
25
28
tracer = trace .get_tracer (__name__ )
26
29
30
+ MIGRATION_MESSAGE = "Wren engine is migrating to Rust version now. \
31
+ Wren AI team are appreciate if you can provide the error messages and related logs for us."
32
+
27
33
28
34
@router .post ("/{data_source}/query" , dependencies = [Depends (verify_query_dto )])
29
35
async def query (
@@ -32,6 +38,7 @@ async def query(
32
38
dry_run : Annotated [bool , Query (alias = "dryRun" )] = False ,
33
39
limit : int | None = None ,
34
40
headers : Annotated [str | None , Header ()] = None ,
41
+ java_engine_connector : JavaEngineConnector = Depends (get_java_engine_connector ),
35
42
) -> Response :
36
43
span_name = (
37
44
f"v3_query_{ data_source } _dry_run" if dry_run else f"v3_query_{ data_source } "
@@ -41,44 +48,68 @@ async def query(
41
48
):
42
49
try :
43
50
sql = pushdown_limit (dto .sql , limit )
44
- except Exception :
45
- logger .warning ("Failed to pushdown limit. Using original SQL" )
46
- sql = dto .sql
47
-
48
- rewritten_sql = await Rewriter (
49
- dto .manifest_str , data_source = data_source , experiment = True
50
- ).rewrite (sql )
51
- connector = Connector (data_source , dto .connection_info )
52
- if dry_run :
53
- connector .dry_run (rewritten_sql )
54
- return Response (status_code = 204 )
55
- return ORJSONResponse (to_json (connector .query (rewritten_sql , limit = limit )))
51
+ rewritten_sql = await Rewriter (
52
+ dto .manifest_str , data_source = data_source , experiment = True
53
+ ).rewrite (sql )
54
+ connector = Connector (data_source , dto .connection_info )
55
+ if dry_run :
56
+ connector .dry_run (rewritten_sql )
57
+ return Response (status_code = 204 )
58
+ return ORJSONResponse (to_json (connector .query (rewritten_sql , limit = limit )))
59
+ except Exception as e :
60
+ logger .warning (
61
+ "Failed to execute v3 query, fallback to v2: {}\n " + MIGRATION_MESSAGE ,
62
+ str (e ),
63
+ )
64
+ return await v2 .connector .query (
65
+ data_source , dto , dry_run , limit , java_engine_connector , headers
66
+ )
56
67
57
68
58
69
@router .post ("/dry-plan" )
59
70
async def dry_plan (
60
71
dto : DryPlanDTO ,
61
72
headers : Annotated [str | None , Header ()] = None ,
73
+ java_engine_connector : JavaEngineConnector = Depends (get_java_engine_connector ),
62
74
) -> str :
63
75
with tracer .start_as_current_span (
64
76
name = "dry_plan" , kind = trace .SpanKind .SERVER , context = build_context (headers )
65
77
):
66
- return await Rewriter (dto .manifest_str , experiment = True ).rewrite (dto .sql )
78
+ try :
79
+ return await Rewriter (dto .manifest_str , experiment = True ).rewrite (dto .sql )
80
+ except Exception as e :
81
+ logger .warning (
82
+ "Failed to execute v3 dry-plan, fallback to v2: {}\n "
83
+ + MIGRATION_MESSAGE ,
84
+ str (e ),
85
+ )
86
+ return await v2 .connector .dry_plan (dto , java_engine_connector , headers )
67
87
68
88
69
89
@router .post ("/{data_source}/dry-plan" )
70
90
async def dry_plan_for_data_source (
71
91
data_source : DataSource ,
72
92
dto : DryPlanDTO ,
73
93
headers : Annotated [str | None , Header ()] = None ,
94
+ java_engine_connector : JavaEngineConnector = Depends (get_java_engine_connector ),
74
95
) -> str :
75
96
span_name = f"v3_dry_plan_{ data_source } "
76
97
with tracer .start_as_current_span (
77
98
name = span_name , kind = trace .SpanKind .SERVER , context = build_context (headers )
78
99
):
79
- return await Rewriter (
80
- dto .manifest_str , data_source = data_source , experiment = True
81
- ).rewrite (dto .sql )
100
+ try :
101
+ return await Rewriter (
102
+ dto .manifest_str , data_source = data_source , experiment = True
103
+ ).rewrite (dto .sql )
104
+ except Exception as e :
105
+ logger .warning (
106
+ "Failed to execute v3 dry-plan, fallback to v2: {}\n "
107
+ + MIGRATION_MESSAGE ,
108
+ str (e ),
109
+ )
110
+ return await v2 .connector .dry_plan_for_data_source (
111
+ data_source , dto , java_engine_connector , headers
112
+ )
82
113
83
114
84
115
@router .post ("/{data_source}/validate/{rule_name}" )
@@ -87,17 +118,28 @@ async def validate(
87
118
rule_name : str ,
88
119
dto : ValidateDTO ,
89
120
headers : Annotated [str | None , Header ()] = None ,
121
+ java_engine_connector : JavaEngineConnector = Depends (get_java_engine_connector ),
90
122
) -> Response :
91
123
span_name = f"v3_validate_{ data_source } "
92
124
with tracer .start_as_current_span (
93
125
name = span_name , kind = trace .SpanKind .SERVER , context = build_context (headers )
94
126
):
95
- validator = Validator (
96
- Connector (data_source , dto .connection_info ),
97
- Rewriter (dto .manifest_str , data_source = data_source , experiment = True ),
98
- )
99
- await validator .validate (rule_name , dto .parameters , dto .manifest_str )
100
- return Response (status_code = 204 )
127
+ try :
128
+ validator = Validator (
129
+ Connector (data_source , dto .connection_info ),
130
+ Rewriter (dto .manifest_str , data_source = data_source , experiment = True ),
131
+ )
132
+ await validator .validate (rule_name , dto .parameters , dto .manifest_str )
133
+ return Response (status_code = 204 )
134
+ except Exception as e :
135
+ logger .warning (
136
+ "Failed to execute v3 validate, fallback to v2: {}\n "
137
+ + MIGRATION_MESSAGE ,
138
+ str (e ),
139
+ )
140
+ return await v2 .connector .validate (
141
+ data_source , rule_name , dto , java_engine_connector , headers
142
+ )
101
143
102
144
103
145
@router .get ("/{data_source}/functions" )
@@ -120,17 +162,26 @@ async def model_substitute(
120
162
data_source : DataSource ,
121
163
dto : TranspileDTO ,
122
164
headers : Annotated [str | None , Header ()] = None ,
165
+ java_engine_connector : JavaEngineConnector = Depends (get_java_engine_connector ),
123
166
) -> str :
124
167
span_name = f"v3_model-substitute_{ data_source } "
125
168
with tracer .start_as_current_span (
126
169
name = span_name , kind = trace .SpanKind .SERVER , context = build_context (headers )
127
170
):
128
- sql = ModelSubstitute (data_source , dto .manifest_str ).substitute (dto .sql )
129
- Connector (data_source , dto .connection_info ).dry_run (
130
- await Rewriter (
131
- dto .manifest_str ,
132
- data_source = data_source ,
133
- experiment = True ,
134
- ).rewrite (sql )
135
- )
136
- return sql
171
+ try :
172
+ sql = ModelSubstitute (data_source , dto .manifest_str ).substitute (dto .sql )
173
+ Connector (data_source , dto .connection_info ).dry_run (
174
+ await Rewriter (
175
+ dto .manifest_str ,
176
+ data_source = data_source ,
177
+ experiment = True ,
178
+ ).rewrite (sql )
179
+ )
180
+ return sql
181
+ except Exception as e :
182
+ logger .warning (
183
+ "Failed to execute v3 model-substitute, fallback to v2: {}" , str (e )
184
+ )
185
+ return await v2 .connector .model_substitute (
186
+ data_source , dto , java_engine_connector , headers
187
+ )
0 commit comments